[AWS IoT Core] AWS IoT Device SDK v2 for Python でMQTT5 のサポートが始まりました (Developer Preview)
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
re:Invent 2022 で AWS IoT Core の MQTT v5 対応が発表された。
AWS IoT announces general availability for version 5 of MQTT message broker (MQTT5)
発表当初、AWS SDK は、MQTT v5 未対応で、公式の Blog でも、paho-mqtt を使用した例が紹介されていました。
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
しかし、先ほど、何気なく確認すると、AWS IoT Device SDK v2 for Python で、開発者プレビューですが、MQTT v 5対応がリリースされていました。
https://github.com/aws/aws-iot-device-sdk-python-v2/releases
今回は、簡単ですが、Python SDK での MQTT v5 接続を試してみました。
2 AWS IoT Device SDK v2 for Python
MQTT v5 に関するドキュメントは、以下です。
https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5.md
まだ、サンプルコードを見ると、MQTT v5 での実装方法が、一発で把握できます。
https://github.com/aws/aws-iot-device-sdk-python-v2/tree/main/samples
3 最小サンプル
aws-iot-device-sdk-python-v2/samples/mqtt5_pubsub.py を参考にさせて頂いて、X509 証明書による TLS 接続のみの最小のサンプルを作成してみました。
最初に、awsiotsdkをアップデートしましたが、1.12.0となっていました。
% pip3 install -U awsiotsdk % pip3 freeze | grep aws awscrt==0.16.0 awsiotsdk==1.12.0
コードです。 接続後、1 回メッセージを送受信し、3 秒後に終了します。
import os import time import threading from concurrent.futures import Future from awsiot import mqtt5_client_builder from awscrt import mqtt5 endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } client_id = "client_id" topic = "sensor/device01" payload = "message" TIMEOUT = 100 received_all_event = threading.Event() future_stopped = Future() future_connection_success = Future() def on_publish_received(publish_packet_data): publish_packet = publish_packet_data.publish_packet print("on_publish_received topic:{} payload:{}".format(publish_packet.topic, publish_packet.payload)) def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): print("on_lifecycle_stopped") global future_stopped future_stopped.set_result(lifecycle_stopped_data) def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): print("on_lifecycle_connection_success") global future_connection_success future_connection_success.set_result(lifecycle_connect_success_data) def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData): print("Lifecycle Connection Failure") print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception)) if __name__ == '__main__': client = mqtt5_client_builder.mtls_from_path( endpoint=endpoint, port=port, cert_filepath=certs["certfile"], pri_key_filepath=certs["keyfile"], ca_filepath=certs["cafile"], on_publish_received=on_publish_received, on_lifecycle_stopped=on_lifecycle_stopped, on_lifecycle_connection_success=on_lifecycle_connection_success, on_lifecycle_connection_failure=on_lifecycle_connection_failure, client_id=client_id) client.start() lifecycle_connect_success_data = future_connection_success.result(TIMEOUT) connack_packet = lifecycle_connect_success_data.connack_packet negotiated_settings = lifecycle_connect_success_data.negotiated_settings # Subscribe print("Subscribing to topic '{}'...".format(topic)) subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket( subscriptions=[mqtt5.Subscription( topic_filter=topic, qos=mqtt5.QoS.AT_LEAST_ONCE)] )) suback = subscribe_future.result(TIMEOUT) print("Subscribed with {}".format(suback.reason_codes)) print("Publishing message to topic '{}': {}".format(topic,payload)) publish_future = client.publish(mqtt5.PublishPacket( topic=topic, payload=payload, qos=mqtt5.QoS.AT_LEAST_ONCE )) publish_completion_data = publish_future.result(TIMEOUT) print("PubAck received with {}".format(repr(publish_completion_data.puback.reason_code))) # wait 3 sec print("wait 3 sec") time.sleep(3) # Unsubscribe print("Unsubscribing from topic '{}'".format(topic)) unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket( topic_filters=[topic])) unsuback = unsubscribe_future.result(TIMEOUT) print("Unsubscribed with {}".format(unsuback.reason_codes)) print("Stopping Client") client.stop() future_stopped.result(TIMEOUT) print("Client Stopped!")
実行結果は、以下のようになります。
% python3 index.py on_lifecycle_connection_success Subscribing to topic 'sensor/device01'... Subscribed with [<SubackReasonCode.GRANTED_QOS_1: 1>] Publishing message to topic 'sensor/device01': message PubAck received with <PubackReasonCode.SUCCESS: 0> wait 3 sec on_publish_received topic:sensor/device01 payload:b'message' Unsubscribing from topic 'sensor/device01' Unsubscribed with [<UnsubackReasonCode.SUCCESS: 0>] Stopping Client on_lifecycle_stopped Client Stopped!
4 最後に
今回は、簡単ですが、Python SDK での MQTT v5 接続を試してみました。
SDK も対応して、いよいよ、MQTT v5 使っていこう!ってところですが・・・まだ、開発者プレビューであることにご注意ください。
5 参考リンク
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました
[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました